Skip to content

Feature: Parallel message processing #796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

Conversation

1yam
Copy link
Member

@1yam 1yam commented May 21, 2025

The goal of this PR is to process messages in parallel based on address

Related Clickup or Jira tickets : ALEPH-XXX

Self proofreading checklist

  • Is my code clear enough and well documented
  • Are my files well typed
  • New translations have been added or updated if new strings have been introduced in the frontend
  • Database migrations file are included
  • Are there enough tests
  • Documentation has been included (for new feature)

Changes

This pull request introduces significant enhancements to the handling of pending messages in the database and the processing pipeline. The changes include the addition of a new accessor method for fetching batches of messages by address, improvements to the message processing logic for concurrency, and updates to type hints and imports for better clarity and functionality.

Database Accessor Enhancements:

  • Added a new method, get_next_pending_messages_by_address, in src/aleph/db/accessors/pending_messages.py to fetch batches of pending messages grouped by address. This method supports filtering by fetched status, excluded hashes, and excluded addresses, and ensures efficient querying by limiting results to a specified batch size.

Message Processing Pipeline Improvements:

  • Refactored the process_messages method in src/aleph/jobs/process_pending_messages.py to support concurrent processing of messages using asyncio tasks and semaphores. This allows multiple addresses to be processed in parallel while maintaining a limit on the number of concurrent tasks.
  • Introduced a new process_message_batch method for handling batches of messages associated with the same address, ensuring that tasks are properly cleaned up upon completion.
  • Added tracking for processed message hashes and active addresses to avoid duplicate processing and ensure efficient task management. [1] [2]

Type Hint and Import Updates:

  • Updated type hints across src/aleph/db/accessors/pending_messages.py and src/aleph/jobs/process_pending_messages.py to include List and Set for improved type clarity. [1] [2]
  • Adjusted imports in src/aleph/jobs/process_pending_messages.py to include new dependencies such as PendingMessageDb and get_next_pending_messages_by_address. [1] [2]

These changes enhance the scalability and maintainability of the system by improving the efficiency of pending message retrieval and processing.

How to test

To be tested you can sync a node

Process

Print screen / video

Upload here print screens or videos showing the changes if relevant.

Notes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant